1 /*
2  * Collie - An asynchronous event-driven network framework using Dlang development
3  *
4  * Copyright (C) 2015-2017  Shanghai Putao Technology Co., Ltd 
5  *
6  * Developer: putao's Dlang team
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module collie.bootstrap.clientmanger;
12 
13 import collie.net;
14 import collie.channel;
15 import kiss.event.timer.common;
16 import collie.utils.memory;
17 import kiss.util.functional;
18 import collie.exception;
19 import collie.utils.exception;
20 import collie.net.client.linklogInfo;
21 import std.exception;
22 
23 public import kiss.net.TcpStream;
24 import kiss.event.task;
25 
26 final class ClientManger(PipeLine)
27 {
28 	alias ClientConnection = ClientLink!PipeLine;
29 	alias PipeLineFactory = PipelineFactory!PipeLine;
30 	alias ClientCreatorCallBack = void delegate(TcpStream);
31 	alias ConnCallBack = void delegate(PipeLine);
32 	alias LinkManger = TLinkManger!ConnCallBack;
33 	alias LinklogInfo = LinkManger.LinklogInfo;
34 
35 	this(EventLoop loop)
36 	{
37 		_loop = loop;
38 		_list = new ClientConnection();
39 	}
40 	
41 	~this()
42 	{
43 		if (_timer)
44 			_timer.destroy;
45 	}
46 
47 	void setClientCreatorCallBack(ClientCreatorCallBack cback)
48 	{
49 		_oncreator = cback;
50 	}
51 
52 	void pipelineFactory(shared PipeLineFactory fac)
53 	{
54 		_factory = fac;
55 	}
56 
57 	void connect(Address to, ConnCallBack cback = null)
58 	{
59 		LinklogInfo * tlogInfo = new LinklogInfo();
60 		tlogInfo.addr = to;
61 		tlogInfo.tryCount = 0;
62 		tlogInfo.cback = cback;
63 		_loop.postTask(newTask((){
64 				_waitConnect.addlogInfo(tlogInfo);
65 				connect(tlogInfo);
66 			}));
67 	}
68 
69 	void close()
70 	{
71 		auto con = _list.next;
72 		_list.next = null;
73 		while(con) {
74 			auto tcon = con;
75 			con = con.next;
76 			tcon.close();
77 		}
78 	}
79 
80 	@property tryCount(){return _tryCount;}
81 	@property tryCount(uint count){_tryCount = count;}
82 
83 	alias heartbeatTimeOut = startTimeOut;
84 	// 定时器不精确,需要小心误差
85 	bool startTimeOut(uint s)
86 	{
87 		return getTimeWheelConfig(s);
88 	}
89 
90 	@property EventLoop eventLoop()
91 	{
92 		return _loop;
93 	}
94 
95 protected:
96 	void connect(LinklogInfo * logInfo)
97 	{
98 		logInfo.client = new TcpStream(_loop);
99 		if(_oncreator)
100 			_oncreator(logInfo.client);
101 		logInfo.client.setCloseHandle(&tmpCloseCallBack);
102 		logInfo.client.setConnectHandle(bind(&connectCallBack,logInfo));
103 		logInfo.client.setReadHandle(&tmpReadCallBack);
104 		logInfo.client.connect(logInfo.addr);
105 	}
106 
107 	void connectCallBack(LinklogInfo * tlogInfo,bool isconnect) nothrow @trusted
108 	{
109 		catchAndLogException((){
110 		import std.exception;
111 		if(tlogInfo is null)return;
112 		if(isconnect){
113 			scope(exit){
114 				_waitConnect.rmlogInfo(tlogInfo);
115 			}
116 			PipeLine pipe = null;
117 			collectException(_factory.newPipeline(tlogInfo.client),pipe);
118 			if(tlogInfo.cback)
119 				tlogInfo.cback(pipe);
120 			if(pipe is null)return;
121 			ClientConnection con = new ClientConnection(this,pipe);
122 			_wheel.addNewTimer(con);
123 
124 			con.next = _list.next;
125 			if(con.next)
126 				con.next.prev = con;
127 			con.prev = _list;
128 			_list.next = con;
129 
130 			con.initialize();
131 
132 		} else {// 重试一次,失败就释放资源
133 			tlogInfo.client = null;
134 			if(tlogInfo.tryCount < _tryCount) {
135 				tlogInfo.tryCount ++;
136 				connect(tlogInfo);
137 			}else{
138 				auto cback = tlogInfo.cback;
139 				_waitConnect.rmlogInfo(tlogInfo);
140 				gcFree(tlogInfo);
141 				if(cback)
142 					cback(null);
143 			}
144 		}
145 		}());
146 	}
147 
148 	void tmpCloseCallBack() nothrow{}
149 
150 	void tmpReadCallBack(in ubyte[] buffer) nothrow{}
151 
152 	void remove(ClientConnection con)
153 	{
154 		con.prev.next = con.next;
155 		if(con.next)
156 			con.next.prev = con.prev;
157 		gcFree(con);
158 	}
159 
160 	bool getTimeWheelConfig(uint _timeOut)
161 	{
162 		uint whileSize;uint time; 
163 		if (_timeOut == 0)
164 			return false;
165 		if (_timeOut <= 40)
166 		{
167 			whileSize = 50;
168 			time = _timeOut * 1000 / 50;
169 		}
170 		else if (_timeOut <= 120)
171 		{
172 			whileSize = 60;
173 			time = _timeOut * 1000 / 60;
174 		}
175 		else if (_timeOut <= 600)
176 		{
177 			whileSize = 100;
178 			time = _timeOut * 1000 / 100;
179 		}
180 		else if (_timeOut < 1000)
181 		{
182 			whileSize = 150;
183 			time = _timeOut * 1000 / 150;
184 		}
185 		else
186 		{
187 			whileSize = 180;
188 			time = _timeOut * 1000 / 180;
189 		}
190 		if (_timer)
191 			return false;
192 		_timer = new KissTimer(_loop);
193 		_wheel = new TimingWheel(whileSize);
194 		_timer.setTimerHandle(()nothrow{_wheel.prevWheel();});
195 		return _timer.start(time);
196 	}
197 
198 private:
199 	//int[ClientConnection] _list;
200 	ClientConnection _list;
201 	LinkManger _waitConnect;
202 
203 	shared PipeLineFactory _factory;
204 	TimingWheel _wheel;
205 	KissTimer _timer;
206 	EventLoop _loop;
207 
208 	uint _tryCount;
209 	ClientCreatorCallBack _oncreator;
210 }
211 
212 package:
213 
214 final @trusted class ClientLink(PipeLine) : WheelTimer, PipelineManager
215 {
216 	alias ConnectionManger = ClientManger!PipeLine;
217 
218 	pragma(inline, true) void initialize()
219 	{
220 		_pipe.transportActive();
221 	}
222 
223 	pragma(inline, true) void close()
224 	{
225 		_pipe.transportInactive();
226 	}
227 
228 	override void onTimeOut() nothrow
229 	{
230 		try{
231 			_pipe.timeOut();
232 		} catch (Exception e){
233 			showException(e);
234 		}
235 	}
236 
237 	override void refreshTimeout() 
238 	{
239 		rest();
240 	}
241 
242 	override void deletePipeline(PipelineBase pipeline)
243 	{
244 		pipeline.pipelineManager = null;
245 		stop();
246 		_manger.remove(this);
247 	}
248 protected:
249 	this(ConnectionManger manger, PipeLine pipe)
250 	{
251 		_manger = manger;
252 		_pipe = pipe;
253 		_pipe.finalize();
254 		_pipe.pipelineManager(this);
255 	}
256 private:
257 	this(){}
258 	ClientLink!PipeLine prev;
259 	ClientLink!PipeLine next;
260 private:
261 	ConnectionManger _manger;
262 	PipeLine _pipe;
263 	string _name;
264 }
265 
266 package:
267 struct TLinklogInfo(TCallBack) if(is(TCallBack == delegate))
268 {
269 	TcpStream client;
270 	Address addr;
271 	uint tryCount = 0;
272 	TCallBack cback;
273 }